package com.hao.chapter11;

import com.hao.chapter05.ClickSource;
import com.hao.chapter05.Event;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class ProcTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String creatDDL = "CREATE TABLE clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts PROCTIME()" +
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(creatDDL);


        //2.在数据流中转换表定义时间属性
        DataStream<Event> eventStream = env.addSource(new ClickSource());

        Table eventTable = tableEnv.fromDataStream(eventStream, $("user"), $("url"), $("timestamp").as("ts")
                , $("et").proctime());

        eventTable.printSchema();


        env.execute();
    }
}
